package com.xiaofan

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.{CEP, PatternStream}
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

import java.time.Duration
import java.util

object LoginFailWithCep {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val inputStream: DataStream[String] = env.readTextFile("D:\\big-data\\code\\UserBehaviorAnalysis\\LoginFailDetect\\src\\main\\resources\\LoginLog.csv")

    val dataStream: DataStream[LoginEvent] = inputStream
      .map {
        data => {
          val arr: Array[String] = data.split(",")
          LoginEvent(arr(0).toLong, arr(1), arr(2), arr(3).toLong)
        }
      }
      .assignTimestampsAndWatermarks {
        WatermarkStrategy
          .forBoundedOutOfOrderness[LoginEvent](Duration.ofSeconds(3))
          .withTimestampAssigner(new SerializableTimestampAssigner[LoginEvent] {
            override def extractTimestamp(element: LoginEvent, recordTimestamp: Long): Long = element.timestamp * 1000L
          })
      }

    // 1. 定义模式匹配，要求是一个登录失败事件后，紧跟着另一个登录失败时间
    val loginFailPattern: Pattern[LoginEvent, LoginEvent] = Pattern
      .begin[LoginEvent]("firstFail").where(_.eventType == "fail")
      .next("secondFail").where(_.eventType == "fail")
      .within(Time.seconds(2))

    //2 . 将模式应用到数据流上， 得到一个PatternStream
    val patternStream: PatternStream[LoginEvent] = CEP.pattern(dataStream.keyBy(_.userId), loginFailPattern)

    // 3. 检出符合模式的数据流， 需要用select
    val cepStream: DataStream[LoginFailWarning] = patternStream.select(new LoginFailEventMatch())

    cepStream.print()

    env.execute("cep test")

  }
}


class LoginFailEventMatch extends PatternSelectFunction[LoginEvent, LoginFailWarning] {
  override def select(pattern: util.Map[String, util.List[LoginEvent]]): LoginFailWarning = {
    // 当前匹配到的事件序列，就保存在Map里
    val firstFailEvent: LoginEvent = pattern.get("firstFail").get(0)
    val secondFailEvent: LoginEvent = pattern.get("secondFail").iterator().next()
    LoginFailWarning(firstFailEvent.userId, firstFailEvent.timestamp, secondFailEvent.timestamp, "login fail")
  }
}























